Spark explode array and map columns to rows

您所在的位置:网站首页 pyspark explode函数 Spark explode array and map columns to rows

Spark explode array and map columns to rows

2024-04-12 02:49| 来源: 网络整理| 查看: 265

In this article, I will explain how to explode array or list and map DataFrame columns to rows using different Spark explode functions (explode, explore_outer, posexplode, posexplode_outer) with Scala example.

While working with structured files like JSON, Parquet, Avro, and XML we often get data in collections like arrays, lists, and maps, In such cases, these explode functions are useful to convert collection columns to rows in order to process in Spark effectively.

Related:

How to flatten nested Struct column How to flatten nested array column

Though I’ve explained here with Scala, a similar method could be used to explode array and map columns to rows with PySpark and if time permits I will cover it in the future. If you are looking for PySpark, I would still recommend reading through this article as it would give you an Idea on Spark explode functions and usage.

Before we start, let’s create a DataFrame with array and map fields, below snippet, creates a DF with columns “name” as StringType, “knownLanguage” as ArrayType and “properties” as MapType.

And, from below code, “spark” is an instance of SparkSession, please refer to complete code at the end to see how to create SparkSession object.

import spark.implicits._ val arrayData = Seq( Row("James",List("Java","Scala"),Map("hair"->"black","eye"->"brown")), Row("Michael",List("Spark","Java",null),Map("hair"->"brown","eye"->null)), Row("Robert",List("CSharp",""),Map("hair"->"red","eye"->"")), Row("Washington",null,null), Row("Jefferson",List(),Map()) ) val arraySchema = new StructType() .add("name",StringType) .add("knownLanguages", ArrayType(StringType)) .add("properties", MapType(StringType,StringType)) val df = spark.createDataFrame(spark.sparkContext.parallelize(arrayData),arraySchema) df.printSchema() df.show(false) 1. explode – spark explode array or map column to rows

Spark function explode(e: Column) is used to explode or create array or map columns to rows. When an array is passed to this function, it creates a new default column “col1” and it contains all array elements. When a map is passed, it creates two new columns one for key and one for value and each element in map split into the row.

This will ignore elements that have null or empty. from the above example, Washington and Jefferson have null or empty values in array and map, hence the following snippet out does not contain these rows.

1.1 explode – array column example // Explode - array column example df.select($"name",explode($"knownLanguages")) .show(false)

Outputs:

// Output +-------+------+ |name |col | +-------+------+ |James |Java | |James |Scala | |Michael|Spark | |Michael|Java | |Michael|null | |Robert |CSharp| |Robert | | +-------+------+ 1.2 explode – map column example // Explode - map column example df.select($"name",explode($"properties")) .show(false)

Outputs:

// Output +-------+----+-----+ |name |key |value| +-------+----+-----+ |James |hair|black| |James |eye |brown| |Michael|hair|brown| |Michael|eye |null | |Robert |hair|red | |Robert |eye | | +-------+----+-----+ 2. explode_outer – Create rows for each element in an array or map.

Spark SQL explode_outer(e: Column) function is used to create a row for each element in the array or map column. Unlike explode, if the array or map is null or empty, explode_outer returns null.

2.1 explode_outer – array example // Explode_outer - array example df.select($"name",explode_outer($"knownLanguages")) .show(false)

Outputs:

// Output: +----------+------+ |name |col | +----------+------+ |James |Java | |James |Scala | |Michael |Spark | |Michael |Java | |Michael |null | |Robert |CSharp| |Robert | | |Washington|null | |Jeferson |null | +----------+------+ 2.2 explode_outer – map example // explode_outer - map example df.select($"name",explode_outer($"properties")) .show(false)

Outputs:

// Output +----------+----+-----+ |name |key |value| +----------+----+-----+ |James |hair|black| |James |eye |brown| |Michael |hair|brown| |Michael |eye |null | |Robert |hair|red | |Robert |eye | | |Washington|null|null | |Jeferson |null|null | +----------+----+-----+ 3. posexplode – explode array or map elements to rows

posexplode(e: Column) creates a row for each element in the array and creates two columns “pos’ to hold the position of the array element and the ‘col’ to hold the actual array value. And when the input column is a map, posexplode function creates 3 columns “pos” to hold the position of the map element, “key” and “value” columns.

This will ignore elements that have null or empty. Since the Washington and Jefferson have null or empty values in array and map, the following snippet out does not contain these.

3.1 posexplode – array example // Posexplode - array example df.select($"name",posexplode($"knownLanguages")) .show(false)

Outputs:

// Output +-------+---+------+ |name |pos|col | +-------+---+------+ |James |0 |Java | |James |1 |Scala | |Michael|0 |Spark | |Michael|1 |Java | |Michael|2 |null | |Robert |0 |CSharp| |Robert |1 | | +-------+---+------+ 3.2 posexplode – map example df.select($"name",posexplode($"properties")) .show(false)

Outputs:

// Output +-------+---+----+-----+ |name |pos|key |value| +-------+---+----+-----+ |James |0 |hair|black| |James |1 |eye |brown| |Michael|0 |hair|brown| |Michael|1 |eye |null | |Robert |0 |hair|red | |Robert |1 |eye | | +-------+---+----+-----+ 4. posexplode_outer – explode array or map columns to rows.

Spark posexplode_outer(e: Column) creates a row for each element in the array and creates two columns “pos’ to hold the position of the array element and the ‘col’ to hold the actual array value. Unlike posexplode, if the array or map is null or empty, posexplode_outer function returns null, null for pos and col columns. Similarly for the map, it returns rows with nulls.

4.1 posexplode_outer – array example // Posexplode_outer - array example df.select($"name",posexplode_outer($"knownLanguages")) .show(false)

Outputs:

// Output +----------+----+------+ |name |pos |col | +----------+----+------+ |James |0 |Java | |James |1 |Scala | |Michael |0 |Spark | |Michael |1 |Java | |Michael |2 |null | |Robert |0 |CSharp| |Robert |1 | | |Washington|null|null | |Jeferson |null|null | +----------+----+------+ 4.2 posexplode_outer – map example // Posexplode_outer - map example df.select($"name",posexplode_outer($"properties")) .show(false)

Outputs:

// Output +----------+----+----+-----+ |name |pos |key |value| +----------+----+----+-----+ |James |0 |hair|black| |James |1 |eye |brown| |Michael |0 |hair|brown| |Michael |1 |eye |null | |Robert |0 |hair|red | |Robert |1 |eye | | |Washington|null|null|null | |Jeferson |null|null|null | +----------+----+----+-----+ The complete example of exploding array or maps to rows package com.sparkbyexamples.spark.dataframe.functions import com.sparkbyexamples.spark.dataframe.functions.ArraTypeExample.spark import org.apache.spark.sql.{Row, SparkSession} import org.apache.spark.sql.functions._ import org.apache.spark.sql.types.{ArrayType, IntegerType, MapType, StringType, StructType} object ExplodeArrayAndMap{ def main(args:Array[String]) : Unit = { val spark = SparkSession.builder().appName("SparkByExamples.com") .master("local[1]") .getOrCreate() // Create DataFrame val arrayData = Seq( Row("James",List("Java","Scala","C++"),Map("hair"->"black","eye"->"brown")), Row("Michael",List("Spark","Java","C++",null),Map("hair"->"brown","eye"->null)), Row("Robert",List("CSharp","Python",""),Map("hair"->"red","eye"->"")), Row("Washington",null,null), Row("Jeferson",List(),Map()) ) val arraySchema = new StructType() .add("name",StringType) .add("knownLanguages", ArrayType(StringType)) .add("properties", MapType(StringType,StringType)) val df = spark.createDataFrame(spark.sparkContext.parallelize(arrayData),arraySchema) df.printSchema() df.show() import spark.implicits._ // Below are Array examples // Explode df.select($"name",explode($"knownLanguages")) .show() // Explode_outer df.select($"name",explode_outer($"knownLanguages")) .show() // Posexplode df.select($"name",posexplode($"knownLanguages")) .show() // Posexplode_outer df.select($"name",posexplode_outer($"knownLanguages")) .show() // Below are Map examples // Explode df.select($"name",explode($"properties")) .show() // Explode_outer df.select($"name",explode_outer($"properties")) .show() // Posexplode df.select($"name",posexplode($"properties")) .show() // Posexplode_outer df.select($"name",posexplode_outer($"properties")) .show() } } Some common faq’s of explode functions What is explode function

Spark SQL explode function is used to create or split an array or map DataFrame columns to rows. Spark defines several flavors of this function; explode_outer – to handle nulls and empty, posexplode – which explodes with a position of element and posexplode_outer – to handle nulls.

Difference between explode vs explode_outer

explode – creates a row for each element in the array or map column by ignoring null or empty values in array. whereas explode_outer returns all values in array or map including null or empty.

Difference between explode vs posexplode

explode – creates a row for each element in the array or map column. whereas posexplode creates a row for each element in the array and creates two columns ‘pos’ to hold the position of the array element and the ‘col’ to hold the actual array value. And, for the map, it creates 3 columns ‘pos’, ‘key’ and ‘value’

Conclusion

In this article, you have learned how to how to explode or convert array or map DataFrame columns to rows using explode and posexplode SQL functions and their’s respective outer functions and also learned differences between these functions.

Related Articles Spark – explode Array of Array (nested array) to rows Spark – explode Array of Map to rows Spark – explode Array of Struct to rows How to Convert Struct type to Columns in Spark Convert Struct to a Map Type in Spark Spark – Get Size/Length of Array & Map Column Spark – How to Convert Map into Multiple Columns


【本文地址】


今日新闻


推荐新闻


CopyRight 2018-2019 办公设备维修网 版权所有 豫ICP备15022753号-3